-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add decompress interceptor #3274
base: main
Are you sure you want to change the base?
feat: add decompress interceptor #3274
Conversation
938fa4b
to
df4804c
Compare
Should we also handle rawdeflate? |
Yep, I'm using the existing |
lib/interceptor/decompress.js
Outdated
this.#inputStream, | ||
...restDecoders, | ||
this.#onDecompressStreamFinished.bind(this) | ||
).on('data', (chunk) => this.#handler.onData(chunk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing backpressure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation seems lost
} | ||
} | ||
|
||
function createDecompressionInterceptor () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to support add the hinting of supported encoding in case implementers wants to limit this (I can only think for performance reasons).
They can pass a set of encodings supported, and the decompress decorator hints it using accept-encoding
outputStream.on('data', (chunk) => { | ||
if (!this.#handler.onData(chunk)) { | ||
this.#inputStream.pause() | ||
this.#inputStream.once('drain', () => this.#inputStream.resume()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those is wrong. Drain is signaled through the resume function sent to onHeaders
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding | ||
if (requestEncodings.length !== 0 && method !== 'HEAD' && method !== 'CONNECT' && !nullBodyStatus.includes(statusCode)) { | ||
const decoders = [] | ||
for (let i = 0; i < requestEncodings.length; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (let i = 0; i < requestEncodings.length; ++i) { | |
for (let i = requestEncodings.length - 1; i >= 0; --i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other way round, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I made the same changes #3343.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll come back to this PR this week
} | ||
} | ||
|
||
return this.#handler.onHeaders( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting up this pr! I added this interceptor into my project.
This interceptor could also be used with fetch if you remove the content-encoding
. My use case is I have a LoggingInterceptor that sends the response body to GCP logging. This worked great except one API sends a gzipped response. Now I have the following flow:
Decompress -> Logging -> Fetch
delete parsedHeaders['content-encoding'];
const newRawHeaders = Object.entries(parsedHeaders)
.flat()
.map((e) => Buffer.from(e));
return this.#handler.onHeaders(statusCode, newRawHeaders, resume, statusMessage);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Welp - I ended up having issues with the interceptor. Haven't been able to dive in yet, but I feel like my flow should work. Will try again once this is reviewed/merged.
Last piece for this to work for me with fetch. Fetch does some sort of back pressuring. const wrappedResume = () => {
if (this.#inputStream) {
this.#inputStream.resume();
}
return resume();
};
return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText); Full onHeaders onHeaders(statusCode: number, rawHeaders: Buffer[], resume: () => void, statusText: string) {
const parsedHeaders = util.parseHeaders(rawHeaders);
const contentEncoding = parsedHeaders['content-encoding'] as string | undefined;
const requestEncodings = contentEncoding ? contentEncoding.split(',').map((e) => e.trim().toLowerCase()) : [];
const { method } = this.#opts;
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
if (
requestEncodings.length !== 0 &&
method !== 'HEAD' &&
method !== 'CONNECT' &&
!nullBodyStatus.includes(statusCode)
) {
const decoders: Transform[] = [];
for (let i = requestEncodings.length - 1; i >= 0; --i) {
const requestEncoding = requestEncodings[i];
// https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2
if (requestEncoding === 'x-gzip' || requestEncoding === 'gzip') {
decoders.push(
zlib.createGunzip({
// Be less strict when decoding compressed responses, since sometimes
// servers send slightly invalid responses that are still accepted
// by common browsers.
// Always using Z_SYNC_FLUSH is what cURL does.
flush: zlib.constants.Z_SYNC_FLUSH,
finishFlush: zlib.constants.Z_SYNC_FLUSH,
}),
);
} else if (requestEncoding === 'deflate') {
throw new NotImplementedException('deflate is not supported');
} else if (requestEncoding === 'br') {
decoders.push(zlib.createBrotliDecompress());
} else {
decoders.length = 0;
break;
}
}
if (decoders.length !== 0) {
const [firstDecoder, ...restDecoders] = decoders;
this.#inputStream = firstDecoder;
this.#inputStream.on('drain', () => {
if (this.#inputStream) {
this.#inputStream.resume();
}
});
let outputStream = firstDecoder;
if (restDecoders.length !== 0) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
outputStream = pipeline(this.#inputStream, ...restDecoders, this.#onDecompressStreamFinished.bind(this));
} else {
finished(this.#inputStream, this.#onDecompressStreamFinished.bind(this));
}
outputStream.on('data', (chunk) => {
if (!this.#handler.onData!(chunk)) {
if (this.#inputStream) {
this.#inputStream.pause();
}
}
});
}
}
delete parsedHeaders['content-encoding'];
const newRawHeaders = Object.entries(parsedHeaders)
.map(([key, value]) => {
if (Array.isArray(value)) {
return value.map((v) => [key, v]).flat();
} else {
return [key, value];
}
})
.flat()
.map((v) => Buffer.from(v));
const wrappedResume = () => {
if (this.#inputStream) {
this.#inputStream.resume();
}
return resume();
};
return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText);
} |
@tjhiggins Do you want to collaborate on this PR with me? I can give you commit access to the base branch. I think the interceptor interface has changed a little since I opened this PR. I can work on ensuring it works with the latest version of Undici. Once I've done that you, do you want to add the changes above for onHeaders and backpressure? Sound good? |
I only just saw the new hooks for v7. Fetch does decompress, but after the interceptors in v6. Maybe with the new hook |
Added a new interceptor to decompress the response body. I lifted some of the implementation from the way decompression is handled in the fetch client
This relates to...
Rationale
Changes
Features
Bug Fixes
Breaking Changes and Deprecations
Status